package com.softwin.cryptocurrency.demo.ma;

import com.softwin.cryptocurrency.demo.security.JwtCallCredential;
import com.softwin.grpc.common.Common;
import com.softwin.marketdata.gateway.service.MarketDataManageGrpc;
import com.softwin.marketdata.gateway.service.MarketGateway;
import com.softwin.order.reporting.service.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.HashMap;
import java.util.Map;

/**
 * Created by Xiao Yang on 2018/07/03.
 */
public class CrossMovingAverage {
    private String host = "140.207.81.249"; //服务器地址
    private OrderReportingGrpc.OrderReportingStub tgStub; //交易网关
    private MarketDataManageGrpc.MarketDataManageStub mgStub; //行情网关
    private QueryServiceGrpc.QueryServiceStub queryStub ; //查询网关
    private Common.Symbol btcusdtSymbol; //交易币对
    private BigDecimal lastPrice; //最新价格
    private MaCalculator longTimeMa; //长周期均值计算器
    private MaCalculator shortTimeMa; //短周期均值计算器
    private BigDecimal priceDiff ; //MA差值
    private String subaccountCode = "okex_softwin_test"; //子账号
    private String username = "openTrader"; //交易员用户名
    private String password = "66666"; //交易员密码
    private String quantity = "0.0001"; //下单量
    private Map<String, String> orderMap = new HashMap<>(); //等待成交的报单，key为方向(buy/sell)，value为orderId
    private BigDecimal btcBalance; //最新btc仓位
    private BigDecimal initBtcBalance; //策略启动时的btc仓位
    public static void main(String[] args)  throws InterruptedException {
        CrossMovingAverage cma = new CrossMovingAverage(5, 30);
        cma.subMarketData();
        Thread.sleep(200000);
    }

    /**
     * 初始化均值计算
     * 初始化三个grpc服务
     * 调用登录接口，获取token
     * 初始化交易标的
     * @param shortTimePeriod 短周期长度
     * @param longTimePeriod 长周期长度
     */
    private CrossMovingAverage(int shortTimePeriod, int longTimePeriod){
        longTimeMa = new MaCalculator(longTimePeriod);
        shortTimeMa = new MaCalculator(shortTimePeriod);
        ManagedChannel mgChannel = ManagedChannelBuilder.forAddress(host, 8991).usePlaintext().build();
        //初始化行情网关
        mgStub = MarketDataManageGrpc.newStub(mgChannel);
        ManagedChannel tgChannel = ManagedChannelBuilder.forAddress(host, 8993).usePlaintext().build();
        //登录获取token
        LoginGrpc.LoginBlockingStub loginBlockingStub = LoginGrpc.newBlockingStub(tgChannel);
        LoginOuterClass.RspLogin rspLogin = loginBlockingStub.login(LoginOuterClass.ReqLogin.newBuilder().setUsername(username).setPassword(password).build());
        String token = rspLogin.getToken();
        //初始化交易网关和查询网关
        tgStub = OrderReportingGrpc.newStub(tgChannel).withCallCredentials(new JwtCallCredential(token));
        queryStub = QueryServiceGrpc.newStub(tgChannel).withCallCredentials(new JwtCallCredential(token));

        btcusdtSymbol = Common.Symbol.newBuilder().setQuoteCurrency("usdt").setBaseCurrency("btc").setExchangeType("huobi").build();
        init();
    }

    /**
     * 订阅成交回报，在回报时维护orderMap,将已成交订单移除
     * 查询初始仓位
     */
    private void init(){
        //订阅成交回报
        StreamObserver<Orderreporting.ReqSubTrade> reqSubTradeStreamObserver = tgStub.subTrade(new StreamObserver<Orderreporting.RtnSubTrade>() {
            @Override
            public void onNext(Orderreporting.RtnSubTrade value) {
                System.out.println(value);
                //更新btc余额，这里只考虑全部成交，并且忽略了手续费
                if (new BigDecimal(value.getTrade().getQuantity()).compareTo(new BigDecimal(quantity)) == 0) {
                    if (value.getTrade().getSide().equals("buy")) {
                        System.out.println("买单已成交");
                        btcBalance = btcBalance.add(new BigDecimal(quantity));
                        System.out.println("btc余额： " + btcBalance);
                        orderMap.remove("buy");
                    } else if (value.getTrade().getSide().equals("sell")) {
                        System.out.println("卖单已成交");
                        btcBalance = btcBalance.subtract(new BigDecimal(quantity));
                        System.out.println("btc余额： " + btcBalance);
                        orderMap.remove("sell");
                    }
                }
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onCompleted() {

            }
        });
        //不要漏掉下面这句调用，向服务请求订阅成交回报
        reqSubTradeStreamObserver.onNext(Orderreporting.ReqSubTrade.newBuilder().build());

        //查询初始仓位
        queryStub.getSubaccounts(Orderreporting.SubaccountRequest.newBuilder().setSubaccountCode(subaccountCode).setCurrency("btc").build(), new StreamObserver<Orderreporting.SubaccountResponse>() {
            @Override
            public void onNext(Orderreporting.SubaccountResponse value) {
                if(value.getDataCount() == 1 && value.getData(0).getAssetBalanceCount() == 1){
                    btcBalance = new BigDecimal(value.getData(0).getAssetBalance(0).getFree());
                    initBtcBalance = new BigDecimal(btcBalance.toString());
                    System.out.println("初始状态，btc余额： " + btcBalance);
                }else{
                    btcBalance = BigDecimal.ZERO;
                    initBtcBalance = BigDecimal.ZERO;
                    System.out.println("初始状态，btc余额： " + btcBalance);
                }
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onCompleted() {

            }
        });
    }

    /**
     * 调用接口进行限价买入，收到响应后将orderId放入orderMap中
     */
    private void buy(){
        String price = lastPrice.setScale(2, RoundingMode.HALF_EVEN).toString();
        final Orderreporting.InsertOrderRequest orderTradeRequest =Orderreporting.InsertOrderRequest.newBuilder().
                setSubaccountCode(subaccountCode).setSymbol(btcusdtSymbol).setQuantity(quantity).setPrice(price).setType("limit").setSide("buy").build();
        System.out.println("报单买入： 价格" + price);
        orderMap.put("buy", "placeHolderOrderId");//为避免发起报单到收到响应的过程中行情会再次出发报单，先在map里放一个占位符
        tgStub.insertOrder(orderTradeRequest, new StreamObserver<Orderreporting.InsertOrderResponse>() {
            @Override
            public void onNext(Orderreporting.InsertOrderResponse value) {
                if(!value.getCRsp().getIsSuccess()){
                    System.out.println("报单失败: " + value.getCRsp().getErrMsg());
                    orderMap.remove("buy");
                }else{
                    //收到响应后更新orderId
                    orderMap.put("buy", value.getOrderId());
                    System.out.println("报单成功：" + value.getOrderId());
                }
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("报单失败: " + t.getMessage());
                orderMap.remove("buy");
            }

            @Override
            public void onCompleted() {

            }
        });
    }

    /**
     * 调用接口进行限价卖出，收到响应后将orderId放入orderMap中
     */
    private void sell(){
        String price = lastPrice.setScale(2, RoundingMode.HALF_EVEN).toString();
        final Orderreporting.InsertOrderRequest orderTradeRequest =Orderreporting.InsertOrderRequest.newBuilder().
                setSubaccountCode(subaccountCode).setSymbol(btcusdtSymbol).setQuantity(quantity).setPrice(price).setType("limit").setSide("sell").build();
        System.out.println("报单卖出： 价格" + price);
        orderMap.put("sell", "placeHolderOrderId");//为避免发起报单到收到响应的过程中行情会再次出发报单，先在map里放一个占位符
        tgStub.insertOrder(orderTradeRequest, new StreamObserver<Orderreporting.InsertOrderResponse>() {
            @Override
            public void onNext(Orderreporting.InsertOrderResponse value) {
                if(!value.getCRsp().getIsSuccess()){
                    System.out.println("报单失败: " + value.getCRsp().getErrMsg());
                    orderMap.remove("sell");
                }else{
                    //收到响应后更新orderId
                    orderMap.put("sell", value.getOrderId());
                    System.out.println("报单成功：" + value.getOrderId());
                }
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("报单失败: " + t.getMessage());
                orderMap.remove("sell");
            }

            @Override
            public void onCompleted() {

            }
        });
    }

    /**
     * 撤单，收到响应后，将订单从orderMap中移除
     * @param orderId 订单ID
     */
    private void cancel(String orderId) {
        if(orderId.equals("placeHolderOrderId")) //报单还未受理，不需要撤单
            return;
        final Orderreporting.CancelOrderRequest cancelOrderRequest = Orderreporting.CancelOrderRequest.newBuilder().setOrderId(orderId).build();
        tgStub.cancelOrder(cancelOrderRequest, new StreamObserver<Orderreporting.CancelOrderResponse>() {
            @Override
            public void onNext(Orderreporting.CancelOrderResponse value) {
                if(value.getCRsp().getIsSuccess()){
                    if(value.getData().getOrderId().equals(orderMap.get("buy"))){
                        System.out.println("取消买单成功");
                        orderMap.remove("buy");
                    }else if(value.getData().getOrderId().equals(orderMap.get("sell"))){
                        System.out.println("取消卖单成功");
                        orderMap.remove("sell");
                    }
                }else{
                    System.out.println("撤单失败： " + value.getCRsp().getErrMsg());
                }
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onCompleted() {

            }
        });
    }

    /**
     * 订阅行情，将行情放入均线计算器，比较是否发生上穿或下穿，并根据当前状态进行报撤单动作
     */
    private void subMarketData(){
        StreamObserver<MarketGateway.ReqSubTick> requestObserver = mgStub.subTick(new StreamObserver<MarketGateway.TickDataResponse>() {
            @Override
            public void onNext(MarketGateway.TickDataResponse value) {
                if (!value.getCRsp().getIsSuccess()){
                    return;
                }
                MarketGateway.TickData tickData = value.getData();
                lastPrice = new BigDecimal(tickData.getClose());
                longTimeMa.newNum(lastPrice);
                shortTimeMa.newNum(lastPrice);
                switch (checkPriceDiff()){
                    case 1:
                        //做多
                        System.out.println("上穿，应该做多");
                        if(getState() == 0){
                            System.out.println("无仓，买入");
                            buy();
                        }
                        if(getState() == 3){
                            System.out.println("挂了卖单等待成交，应该撤掉");
                            cancel(orderMap.get("sell"));
                        }
                        break;
                    case -1:
                        //做空
                        System.out.println("下穿，应该做空");
                        if(getState() == 2){
                            System.out.println("多仓，卖出");
                            sell();
                        }
                        if(getState() == 1){
                            System.out.println("挂了买单等待成交，应该撤掉");
                            cancel(orderMap.get("buy"));
                        }
                        break;
                    case 0:
                        System.out.println("不变");
                        break;
                    //不变
                }
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onCompleted() {

            }
        });
        MarketGateway.ReqSubTick reqSubTick = MarketGateway.ReqSubTick.newBuilder().setSymbol(btcusdtSymbol).build();
        requestObserver.onNext(reqSubTick);
    }

    /**
     * 判断移动平均线是否交叉
     * 0 未交叉
     * -1 短周期下穿长周期
     * 1 短周期上穿长周期
     * @return 移动平均线是否交叉
     */
    private int checkPriceDiff() {
        int result = 0;
        BigDecimal longTimeMaValue = longTimeMa.getAvg();
        BigDecimal shortTimeMaValue = shortTimeMa.getAvg();
        if(longTimeMaValue.compareTo(BigDecimal.ZERO) > 0 && shortTimeMaValue.compareTo(BigDecimal.ZERO) > 0){
            BigDecimal newPriceDiff = shortTimeMaValue.subtract(longTimeMaValue);
            if(priceDiff != null){
                System.out.println("previous price diff : " + priceDiff + ", latest price diff: " + newPriceDiff + ", shortPeriod: " + shortTimeMaValue + ", longPeriod" + longTimeMaValue);
                if(priceDiff.compareTo(BigDecimal.ZERO)<0 && newPriceDiff.compareTo(BigDecimal.ZERO) > 0){
                    result = 1;
                }else if (priceDiff.compareTo(BigDecimal.ZERO)>0 && newPriceDiff.compareTo(BigDecimal.ZERO) < 0){
                    result = -1;
                }
            }
            priceDiff = newPriceDiff;
        }
        return result;
    }

    /**
     * 0 无仓， 1买入等待成交， 2 多仓， 3 卖出等待成交
     * -1异常
     * @return 策略状态
     */
    private int getState(){
        if(btcBalance.compareTo(initBtcBalance) > 0 ){
            //多仓
            if(orderMap.get("buy") != null){
                System.out.println("多仓状态有买单，异常退出");
                System.exit(0);
            }
            if(orderMap.get("sell") !=null){
                return 3;
            } else{
                return 2;
            }
        } else if (btcBalance.compareTo(initBtcBalance) == 0 ){
            //无仓
            if(orderMap.get("sell") != null){
                System.out.println("无仓状态有卖单，异常退出");
                System.exit(0);
            }
            if(orderMap.get("buy") !=null){
                return 1;
            }else{
                return 0;
            }
        } else {
            //空仓，本例不做空
            System.out.println("btc余额["+btcBalance.toString()+"]小于初始值["+initBtcBalance+"]，异常退出");
            System.exit(0);
        }
        return -1;
    }
}
